package http_proxy

import (
	"context"
	"errors"
	"fmt"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
	"gitee.com/dennis-kk/service-box-go/internal/descriptor"
	"gitee.com/dennis-kk/service-box-go/internal/jsonpb"
	"gitee.com/dennis-kk/service-box-go/util/slog"
	"gitee.com/dennis-kk/service-box-go/util/tools"
	"github.com/gin-gonic/gin"
)

const (
	defaultJsonPbCoder = "jsonpbcpp"
)

const (
	StatusClosed   = iota //服务已经关闭
	StatusResolved        //服务正在运行
	StatusUpdating        //服务正在更新
	StatusClosing         //服务正在关闭
)

var (
	ErrInvalidHttpAddress     = errors.New("invalid http address ")
	ErrNoEffectiveServicePath = errors.New("no effective service config path ")
	ErrInitServiceDBError     = errors.New("create service data base error ")
	ErrRequestNotFound        = errors.New("http call info not found ")
	ErrRequestExpired         = errors.New("the http request has expired ")
	ErrNoEffectiveJsonCoder   = errors.New("no effective json pb coder ")
	ErrRoutePathExists        = errors.New("route path already exists")
)

type (
	// Response http 回包结构
	Response map[string]interface{}

	// RouteHandle 自定义 api 方法处理原型，在独立协程中运行
	RouteHandle func(*Context) (int, Response)

	// CallQueue http 调用请求队列
	CallQueue chan *requestInfo

	//ProxyRequestHandle http 请求转换为rpc 请求后回调函数
	// 成功返回 call id 和 nil 错误; 失败 则返回0 和具体错误
	ProxyRequestHandle func(req *protocol.ProxyRequestPackage) (uint32, error)

	//HttpProxy http 代理，启动http服务器，将收到http 转化为 plato rpc协议
	HttpProxy struct {
		engine          *gin.Engine                              //http router引擎
		server          *http.Server                             //http 服务器
		db              *descriptor.DataBase                     //服务配置路径
		writer          *ProxyWriter                             //日志包装器
		logger          slog.BoxLogger                           //日志模块
		coder           jsonpb.IJsonPbCoder                      //pb 编码器
		address         string                                   //监听端口
		shutdownTimeout time.Duration                            //停机超时时间
		httpQueue       CallQueue                                //http api 请求队列  生产者 http api; 消费者 主协程
		onHttpCall      ProxyRequestHandle                       //请求回调响应函数
		callCache       map[uint32]*requestInfo                  //请求缓存函数
		apis            map[string]bool                          //已经添加的api, 避免重复添加
		status          int32                                    //http proxy 服务状态
		rw              sync.RWMutex                             //读写锁
		changedService  map[string]*descriptor.ServiceDescriptor //探测到磁盘中变化
	}

	//ProxyWriter 日志包装器
	ProxyWriter struct {
		logger slog.BoxLogger
	}
)

func MakeHttpProxy(opts ...Option) *HttpProxy {
	options := &Options{}
	for _, opt := range opts {
		opt(options)
	}

	hp := &HttpProxy{
		status: StatusClosed,
	}

	err := hp.Init(options)
	if err != nil {
		panic(err)
	}

	return hp
}

func SemicolonMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		rawQuery := c.Request.URL.RawQuery
		if rawQuery != "" && strings.Contains(rawQuery, ";") {
			c.Request.URL.RawQuery = strings.ReplaceAll(rawQuery, ";", "%3B")
		}
		c.Next()
	}
}

func (pw *ProxyWriter) Write(p []byte) (n int, err error) {
	if pw.logger != nil {
		pw.logger.Info(string(p))
	}
	return len(p), nil
}

//Init 初始化配置
func (hp *HttpProxy) Init(option *Options) error {

	if err := hp.initResources(option); err != nil {
		return err
	}

	if err := hp.initHttpServer(option); err != nil {
		return err
	}

	if err := hp.initJsonPbCoder(option); err != nil {
		return err
	}

	if err := hp.initServiceDB(option); err != nil {
		return err
	}
	return nil
}

func (hp *HttpProxy) SetHttpRequestHandle(handle ProxyRequestHandle) {
	hp.onHttpCall = handle
}

func (hp *HttpProxy) OnRpcResponse(ret *protocol.ProxyRespPackage) error {
	// 解析获取调用的头信息
	call, ok := hp.callCache[ret.Header.CallID]
	if call == nil || !ok {
		hp.logger.Warn("call info %d not found !", ret.Header.CallID)
		return ErrRequestNotFound
	}

	defer func() {
		call.finish()
		delete(hp.callCache, ret.Header.CallID)
	}()

	//检查call 是否已经过期被关闭
	if call.isFinished() {
		hp.logger.Warn("service %q:%q request %d has expired", call.service, call.method, call.id)
		return ErrRequestExpired
	}

	// 编码为json 字符串
	resp := &responseInfo{
		status: http.StatusOK,
	}

	jsonStr, err := hp.coder.ProtoBufferToJson(call.service, call.method, ret.Buffer)
	if err != nil {
		hp.logger.Warn("service %q method %q response coder error %q", call.service, call.method, err.Error())
		resp.status = http.StatusInternalServerError
		resp.err = err
	} else {
		resp.body = jsonStr
	}

	switch ret.Header.ErrorCode {
	case protocol.IDL_SERVICE_ERROR:
		resp.status = http.StatusInternalServerError
	case protocol.IDL_SERVICE_NOT_FOUND:
		resp.status = http.StatusNotFound
	case protocol.IDL_RPC_TIME_OUT:
		resp.status = http.StatusGatewayTimeout
	case protocol.IDL_RPC_LIMIT:
		resp.status = http.StatusForbidden
	}

	//call.done <- resp 修改为统一流程结束
	call.doRet(resp)

	return nil
}

func (hp *HttpProxy) Start() error {

	// 从内存加载服务
	if err := hp.loadServiceFromDB(); err != nil {
		return err
	}

	//启动db 模块
	if err := hp.db.Start(); err != nil {
		return err
	}

	hp.server = &http.Server{
		Addr:    hp.address,
		Handler: hp.engine,
	}

	ch := make(chan struct{})

	go func() {
		ch <- struct{}{}
		if err := hp.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			hp.logger.Fatal("start http server at %q error %q", hp.address, err.Error())
		}
	}()

	// 等待服务器完全启动成功
	<-ch

	atomic.StoreInt32(&hp.status, StatusResolved)

	return nil
}

func (hp *HttpProxy) Tick() {
	//主线程业务调用
	hp.onTick()
	//热更逻辑
	hp.onServiceReload()
}

//IsResolved 返回是否可以正常提供服务的状态
func (hp *HttpProxy) IsResolved() bool {
	return atomic.LoadInt32(&hp.status) == StatusResolved
}

func (hp *HttpProxy) ShutDown() {
	// 设置状态不在接受新的请求
	atomic.StoreInt32(&hp.status, StatusClosing)
	// 优雅关闭
	ctx, cancel := context.WithTimeout(context.Background(), hp.shutdownTimeout)
	defer cancel()

	// 关停服务器
	if err := hp.server.Shutdown(ctx); err != nil {
		hp.logger.Error("shutdown http server error %q", err.Error())
	} else {
		hp.logger.Info("stop http server successful !")
	}

	// 关闭管道
	close(hp.httpQueue)
	atomic.StoreInt32(&hp.status, StatusClosed)

	// 卸载 依赖模块
	_ = hp.coder.UnInit()

	hp.db.ShutDown()
}

// initResources 初始化 http proxy 需要的资源
func (hp *HttpProxy) initResources(option *Options) error {
	// 初始化 channel
	if option.CacheSize == 0 {
		hp.httpQueue = make(CallQueue, 1024)
	} else {
		hp.httpQueue = make(CallQueue, option.CacheSize)
	}

	hp.callCache = make(map[uint32]*requestInfo)
	hp.changedService = make(map[string]*descriptor.ServiceDescriptor)
	hp.apis = make(map[string]bool)
	return nil
}

func (hp *HttpProxy) initHttpServer(option *Options) error {
	// 初始化引擎
	hp.engine = gin.New()

	// 日志包装，设置日志格式
	if option.logger != nil {
		hp.logger = option.logger
		hp.writer = &ProxyWriter{
			logger: option.logger.Children(
				slog.WithAppName("http_proxy"),
			),
		}
		gin.DefaultWriter = hp.writer
	}
	// 设置日志格式
	hp.engine.Use(gin.LoggerWithFormatter(customLogFormatter))

	// 设置故障恢复
	hp.engine.Use(httpProxyWithLogger(option.logger))

	// TODO 设置链路跟踪

	// 设置分号转换
	hp.engine.Use(SemicolonMiddleware())

	// 检查端口配置是否合理
	if len(option.Address) <= 0 {
		hp.logger.Warn("not effective server address config : <nil>")
		return ErrInvalidHttpAddress
	}

	// 检查端口配置格式是否正确
	if sp := strings.Split(option.Address, ":"); len(sp) != 2 {
		hp.logger.Warn("invalid http proxy address config  %q", option.Address)
		return ErrInvalidHttpAddress
	}

	// 设置模式 debug release test
	gin.SetMode(option.Mode)

	hp.address = option.Address

	// 设置关机超时时间
	if option.ShutdownTimeout == 0 {
		option.ShutdownTimeout = 5000
	}

	hp.shutdownTimeout = time.Duration(option.ShutdownTimeout) * time.Millisecond

	return nil
}

//initJsonPbCoder 初始化pb 编码器
func (hp *HttpProxy) initJsonPbCoder(option *Options) error {

	if len(option.ServicePath) <= 0 {
		return ErrNoEffectiveServicePath
	}

	if !tools.IsEffectiveDir(option.ServicePath) {
		hp.logger.Warn("%q not a effective config path ", option.ServicePath)
		return ErrNoEffectiveServicePath
	}

	coder, err := jsonpb.MakeJsonPbWithType(defaultJsonPbCoder, jsonpb.WithConfigPath(option.ServicePath))
	if err != nil {
		return err
	}

	hp.coder = coder
	return nil
}

func (hp *HttpProxy) initServiceDB(option *Options) error {

	// 初始化数据库
	hp.db = descriptor.MakeDataBase()

	if hp.db == nil {
		hp.logger.Warn("create service database error ")
		return ErrInitServiceDBError
	}

	err := hp.db.Init(
		descriptor.WithLogger(option.logger),
		descriptor.WithConfigPath(option.ServicePath),
		descriptor.WithConfigSuffix(".json"),
	)

	if err != nil {
		hp.db.ShutDown()
		hp.db = nil
		hp.logger.Warn("init service error %q", err.Error())
	}

	hp.db.AddWatcher(hp.onServiceChange)

	return err
}

func (hp *HttpProxy) loadServiceFromDB() error {

	err := hp.db.RangeServices(func(path string, sd *descriptor.ServiceDescriptor, err error) error {
		// 转换uuid
		serviceUuid, err := strconv.ParseInt(sd.Uuid, 10, 64)
		if err != nil {
			hp.logger.Warn("parse service's uuid %q err %q", sd.Uuid, err.Error())
			return err
		}
		return hp.addServiceAPI(uint64(serviceUuid), sd)
	})

	if err != nil {
		hp.logger.Warn("load http gateway api error %q", err.Error())
	}

	return err
}

func (hp *HttpProxy) onRequest(req *requestInfo) {
	hp.httpQueue <- req
}

func (hp *HttpProxy) addServiceAPI(uuid uint64, service *descriptor.ServiceDescriptor) error {
	URITemplate := "/%s/%s"
	for _, m := range service.Methods {

		if m.Oneway {
			hp.logger.Info("skip one way method %q in %q", service.Name, m.Name)
			continue
		}

		// 公共方法直接加入
		if m.Public {
			uri := fmt.Sprintf(URITemplate, service.Name, m.Name)
			// 检查是否已经加载过了，加载过了跳过
			if _, ok := hp.apis[uri]; ok {
				hp.logger.Info("skip public %s, it has been loaded ", uri)
				return nil
			}

			s := &serviceRoute{
				hp:          hp,
				service:     service.Name,
				serviceUuid: uuid,
				method:      m.Name,
				methodId:    m.Index,
				timeout:     time.Duration(m.TimeOut),
			}
			hp.engine.POST(uri, s.onRequest)
			hp.apis[uri] = true
			hp.logger.Info("new public API %s was added to http gateway with given timeout %d", uri, m.TimeOut)
		} else if m.Protected {
			uri := fmt.Sprintf(URITemplate, service.Name, m.Name)
			// 检查是否已经加载过了，加载过了跳过
			if _, ok := hp.apis[uri]; ok {
				hp.logger.Info("skip protected %s, it has been loaded ", uri)
				return nil
			}
			s := &serviceRoute{
				hp:          hp,
				service:     service.Name,
				serviceUuid: uuid,
				method:      m.Name,
				methodId:    m.Index,
				timeout:     time.Duration(m.TimeOut),
			}

			hp.engine.POST(uri, authMiddlewareFunc, s.onRequest)
			hp.apis[uri] = true

			hp.logger.Info("new protected API %s was added to http gateway with given timeout %d", uri, m.TimeOut)
		} else {
			//private 方法禁止访问
			hp.logger.Info("ignore private %s:%s method !", service.Name, m.Name)
		}
	}

	return nil
}

// onServiceChange 响应服务文件状态变化，如果磁盘中文件变动，api自动更新
func (hp *HttpProxy) onServiceChange(changeType int, services []*descriptor.ServiceDescriptor) error {
	switch changeType {
	case descriptor.ServiceRemove:
		for _, s := range services {
			hp.logger.Warn("unsupported service's %s deleted ", s.Name)
		}
		return nil
	case descriptor.ServiceModify:
		//service changed !
		fallthrough
	case descriptor.SerivceAdded:
		//service added !
		hp.rw.Lock()
		defer hp.rw.Unlock()

		for _, service := range services {
			hp.changedService[service.Uuid] = service
			hp.logger.Info("receive service %s:%s change ", service.Uuid, service.Name)
		}
	}
	return nil
}

func (hp *HttpProxy) onServiceReload() {
	var needReload bool
	hp.rw.RLock()
	needReload = len(hp.changedService) != 0
	hp.rw.RUnlock()

	if !needReload {
		return
	}

	// 准备进入热更新流程， 设置服务器状态
	atomic.StoreInt32(&hp.status, StatusUpdating)
	// 上读锁 更新api
	hp.rw.RLock()
	for uid, service := range hp.changedService {
		// 转换uuid
		serviceUuid, err := strconv.ParseInt(uid, 10, 64)
		if err != nil {
			hp.logger.Warn("parse service's uuid %q err %q", uid, err.Error())
			continue
		}

		if err := hp.addServiceAPI(uint64(serviceUuid), service); err != nil {
			hp.logger.Warn("reload service %s %s error !", uid, service.Name)
		} else {
			hp.logger.Info("service %s has been reloaded", service.Uuid)
		}
	}
	hp.rw.RUnlock()
	//上写锁, 清理api
	hp.rw.Lock()
	hp.changedService = make(map[string]*descriptor.ServiceDescriptor)
	hp.rw.Unlock()
	//重新加载json pb
	err := hp.coder.Restart()
	if err != nil {
		hp.logger.Fatal("re-start json pb coder error %s", err.Error())
		return
	}

	// 重新更新为可以服务状态
	atomic.StoreInt32(&hp.status, StatusResolved)
	hp.logger.Info("hot-update http proxy API successfully!")
}

func (hp *HttpProxy) onTick() {
	for {
		select {
		case call, ok := <-hp.httpQueue:
			if !ok {
				// 管道已经关闭
				return
			}

			if call == nil {
				// 管道关闭，或出现错误直接退出循环
				return
			}

			// 检查 request 状态，可能在队列中太久了，已经超时了
			if call.isFinished() {
				hp.logger.Warn("http request /%s/%s has been closed!", call.service, call.method)
				return
			}

			// 转换为pb
			buffer, err := hp.coder.JsonStrToProtoBuffer(call.service, call.method, string(call.body))
			if err != nil {
				call.done <- &responseInfo{
					// 回传错误
					status: http.StatusBadRequest,
					err:    err,
				}
				return
			}

			// 组装调用头
			msg := &protocol.ProxyRequestPackage{
				Header: &protocol.RpcProxyCallHeader{
					RpcMsgHeader: protocol.RpcMsgHeader{
						Type:   protocol.ProxyRequestMsg,
						Length: uint32(protocol.ProxyCallHeadSize + len(buffer)),
					},
					ServiceUUID: call.uid,
					MethodID:    call.mid,
					ServerID:    0,
					//CallId 这里不赋值，由外部调用赋值
				},
				Buffer: buffer,
			}

			call.id, err = hp.onHttpCall(msg)
			// 出现错误，直接返回
			if err != nil || call.id == 0 {
				call.done <- &responseInfo{
					status: http.StatusInternalServerError,
					err:    err,
				}
				hp.logger.Warn("transport http request /%q/%q error %q", call.service, call.method, err.Error())
				return
			}
			// 无事发生，结束调用
			hp.callCache[call.id] = call
		default:
			// 没有需要处理的消息，结束循环
			return
		}
	}
}

//authMiddlewareFunc  鉴权中间件函数
func authMiddlewareFunc(c *gin.Context) {
	// 无鉴权函数
	if authenticator == nil {
		c.Next()
		return
	}

	// 构造context
	hc := &Context{
		ctx: c,
	}

	if err := authenticator(hc); err != nil {
		c.Abort()
		c.JSON(http.StatusForbidden, gin.H{
			"error": err.Error(),
		})
		return
	}
	c.Next()
}
